/*
 * Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.hadoop.fs.contract;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
import static org.assertj.core.api.Assertions.fail;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

/**
 * Test Open operations.
 */
public abstract class AbstractContractOpenTest
    extends AbstractFSContractTestBase {

  private FSDataInputStream instream;

  @Override
  protected Configuration createConfiguration() {
    Configuration conf = super.createConfiguration();
    conf.setInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 4096);
    return conf;
  }

  @AfterEach
  @Override
  public void teardown() throws Exception {
    IOUtils.closeStream(instream);
    instream = null;
    super.teardown();
  }

  @Test
  public void testOpenReadZeroByteFile() throws Throwable {
    describe("create & read a 0 byte file");
    Path path = path("zero.txt");
    touch(getFileSystem(), path);
    instream = getFileSystem().open(path);
    Assertions.assertThat(instream.getPos())
        .isEqualTo(0);
    //expect initial read to fail
    assertMinusOne("initial byte read", instream.read());
  }

  @Test
  public void testFsIsEncrypted() throws Exception {
    describe("create an empty file and call FileStatus.isEncrypted()");
    final Path path = path("file");
    createFile(getFileSystem(), path, false, new byte[0]);
    final FileStatus stat = getFileSystem().getFileStatus(path);
    Assertions.assertThat(stat.isEncrypted())
        .withFailMessage("Result wrong for for isEncrypted() in " + stat)
        .isEqualTo(areZeroByteFilesEncrypted());
  }

  /**
   * Are zero byte files encrypted. This is implicitly
   * false for filesystems which do not encrypt.
   * @return true iff zero byte files are encrypted.
   */
  protected boolean areZeroByteFilesEncrypted() {
    return false;
  }

  @Test
  public void testOpenReadDir() throws Throwable {
    describe("create & read a directory");
    Path path = path("zero.dir");
    mkdirs(path);
    try {
      instream = getFileSystem().open(path);
      //at this point we've opened a directory
      fail("A directory has been opened for reading");
    } catch (FileNotFoundException e) {
      handleExpectedException(e);
    } catch (IOException e) {
      handleRelaxedException("opening a directory for reading",
                             "FileNotFoundException",
                             e);
    }
  }

  @Test
  public void testOpenReadDirWithChild() throws Throwable {
    describe("create & read a directory which has a child");
    Path path = path("zero.dir");
    mkdirs(path);
    Path path2 = new Path(path, "child");
    mkdirs(path2);

    try {
      instream = getFileSystem().open(path);
      //at this point we've opened a directory
      fail("A directory has been opened for reading");
    } catch (FileNotFoundException e) {
      handleExpectedException(e);
    } catch (IOException e) {
      handleRelaxedException("opening a directory for reading",
                             "FileNotFoundException",
                             e);
    }
  }

  @Test
  public void testOpenFileTwice() throws Throwable {
    describe("verify that two opened file streams are independent");
    Path path = path("testopenfiletwice.txt");
    byte[] block = dataset(TEST_FILE_LEN, 0, 255);
    //this file now has a simple rule: offset => value
    createFile(getFileSystem(), path, true, block);
    //open first
    FSDataInputStream instream1 = getFileSystem().open(path);
    FSDataInputStream instream2 = null;
    try {
      int c = instream1.read();
      Assertions.assertThat(c).isEqualTo(0);
      instream2 = getFileSystem().open(path);
      Assertions.assertThat(instream2.read())
          .withFailMessage("first read of instream 2")
          .isEqualTo(0);
      Assertions.assertThat(instream1.read())
          .withFailMessage("second read of instream 1")
          .isEqualTo(1);
      instream1.close();
      Assertions.assertThat(instream2.read())
          .withFailMessage("second read of instream 2")
          .isEqualTo(1);
      //close instream1 again
      instream1.close();
    } finally {
      IOUtils.closeStream(instream1);
      IOUtils.closeStream(instream2);
    }
  }

  @Test
  public void testSequentialRead() throws Throwable {
    describe("verify that sequential read() operations return values");
    Path path = path("testsequentialread.txt");
    int len = 4;
    int base = 0x40; // 64
    byte[] block = dataset(len, base, base + len);
    //this file now has a simple rule: offset => (value | 0x40)
    createFile(getFileSystem(), path, true, block);
    //open first
    instream = getFileSystem().open(path);
    Assertions.assertThat(instream.read())
        .isEqualTo(base);
    Assertions.assertThat(instream.read())
        .isEqualTo(base + 1);
    Assertions.assertThat(instream.read())
        .isEqualTo(base + 2);
    Assertions.assertThat(instream.read())
        .isEqualTo(base + 3);
    // and now, failures
    Assertions.assertThat(instream.read())
        .isEqualTo(-1);
    Assertions.assertThat(instream.read())
        .isEqualTo(-1);
    instream.close();
  }

  @Test
  public void testOpenFileReadZeroByte() throws Throwable {
    describe("create & read a 0 byte file through the builders; use a negative length");
    Path path = path("zero.txt");
    FileSystem fs = getFileSystem();
    fs.createFile(path).overwrite(true).build().close();
    try (FSDataInputStream is = fs.openFile(path)
        .opt("fs.test.something", true)
        .opt("fs.test.something2", 3)
        .opt("fs.test.something3", "3")
        .optLong(FS_OPTION_OPENFILE_LENGTH, -1L)
        .build().get()) {
      assertMinusOne("initial byte read", is.read());
    }
  }

  @Test
  public void testOpenFileUnknownOption() throws Throwable {
    describe("calling openFile fails when a 'must()' option is unknown");
    FutureDataInputStreamBuilder builder =
        getFileSystem().openFile(path("testOpenFileUnknownOption"))
        .opt("fs.test.something", true)
        .must("fs.test.something", true);
    intercept(IllegalArgumentException.class,
        () -> builder.build());
  }

  @Test
  public void testOpenFileUnknownOptionLong() throws Throwable {
    describe("calling openFile fails when a 'must()' option is unknown");
    FutureDataInputStreamBuilder builder =
        getFileSystem().openFile(path("testOpenFileUnknownOption"))
        .optLong("fs.test.something", 1L)
        .mustLong("fs.test.something2", 1L);
    intercept(IllegalArgumentException.class,
        () -> builder.build());
  }

  @Test
  public void testOpenFileLazyFail() throws Throwable {
    describe("openFile fails on a missing file in the get() and not before");
    FutureDataInputStreamBuilder builder =
        getFileSystem().openFile(path("testOpenFileLazyFail"))
            .opt("fs.test.something", true);
    interceptFuture(FileNotFoundException.class, "", builder.build());
  }

  @Test
  public void testOpenFileFailExceptionally() throws Throwable {
    describe("openFile missing file chains into exceptionally()");
    FutureDataInputStreamBuilder builder =
        getFileSystem().openFile(path("testOpenFileFailExceptionally"))
            .opt("fs.test.something", true);
    Assertions.assertThat(builder.build().exceptionally(ex -> null).get())
        .withFailMessage("exceptional uprating")
        .isNull();
  }

  @Test
  public void testAwaitFutureFailToFNFE() throws Throwable {
    describe("Verify that FutureIOSupport.awaitFuture extracts IOExceptions");
    FutureDataInputStreamBuilder builder =
        getFileSystem().openFile(path("testAwaitFutureFailToFNFE"))
            .opt("fs.test.something", true);
    intercept(FileNotFoundException.class,
        () -> awaitFuture(builder.build()));
  }

  @Test
  public void testAwaitFutureTimeoutFailToFNFE() throws Throwable {
    describe("Verify that FutureIOSupport.awaitFuture with a timeout works");
    FutureDataInputStreamBuilder builder =
        getFileSystem().openFile(path("testAwaitFutureFailToFNFE"))
            .opt("fs.test.something", true);
    intercept(FileNotFoundException.class,
        () -> awaitFuture(builder.build(),
            10, TimeUnit.DAYS));
  }

  @Test
  public void testOpenFileExceptionallyTranslating() throws Throwable {
    describe("openFile missing file chains into exceptionally()");
    CompletableFuture<FSDataInputStream> f = getFileSystem()
        .openFile(path("testOpenFileExceptionallyTranslating")).build();
    interceptFuture(RuntimeException.class,
        "exceptionally",
        f.exceptionally(ex -> {
          throw new RuntimeException("exceptionally", ex);
        }));
  }

  @Test
  public void testChainedFailureAwaitFuture() throws Throwable {
    describe("await Future handles chained failures");
    CompletableFuture<FSDataInputStream> f = getFileSystem()
        .openFile(path("testChainedFailureAwaitFuture"))
        .withFileStatus(null)
        .build();
    intercept(RuntimeException.class,
        "exceptionally",
        () -> awaitFuture(
            f.exceptionally(ex -> {
              throw new RuntimeException("exceptionally", ex);
            })));
  }

  @Test
  public void testOpenFileApplyRead() throws Throwable {
    describe("use the apply sequence to read a whole file");
    Path path = path("testOpenFileApplyRead");
    FileSystem fs = getFileSystem();
    int len = 4096;
    createFile(fs, path, true,
        dataset(len, 0x40, 0x80));
    FileStatus st = fs.getFileStatus(path);
    CompletableFuture<Long> readAllBytes = fs.openFile(path)
        .withFileStatus(st)
        .build()
        .thenApply(ContractTestUtils::readStream);
    Assertions.assertThat((long) readAllBytes.get())
        .withFailMessage("Wrong number of bytes read value")
        .isEqualTo(len);
    // now reattempt with a new FileStatus and a different path
    // other than the final name element
    // implementations MUST use path in openFile() call
    FileStatus st2 = new FileStatus(
        len, false,
        st.getReplication(),
        st.getBlockSize(),
        st.getModificationTime(),
        st.getAccessTime(),
        st.getPermission(),
        st.getOwner(),
        st.getGroup(),
        new Path("gopher:///localhost:/" + path.getName()));
    Assertions.assertThat((long) fs.openFile(path)
            .withFileStatus(st2)
            .build()
            .thenApply(ContractTestUtils::readStream)
            .get())
        .withFailMessage("Wrong number of bytes read value")
        .isEqualTo(len);
  }

  @Test
  public void testOpenFileApplyAsyncRead() throws Throwable {
    describe("verify that async accept callbacks are evaluated");
    Path path = path("testOpenFileApplyAsyncRead");
    FileSystem fs = getFileSystem();
    final int len = 512;
    createFile(fs, path, true,
        dataset(len, 0x40, 0x80));
    CompletableFuture<FSDataInputStream> future = fs.openFile(path)
        .mustDouble(FS_OPTION_OPENFILE_LENGTH, 43.2e60)   // pass in a double
        .build();
    AtomicBoolean accepted = new AtomicBoolean(false);
    final Long bytes = future.thenApply(stream -> {
      accepted.set(true);
      return ContractTestUtils.readStream(stream);
    }).get();
    Assertions.assertThat(accepted.get())
        .withFailMessage("async accept operation not invoked")
        .isTrue();
    Assertions.assertThat(bytes)
        .describedAs("bytes read from stream")
        .isEqualTo(len);
  }

  /**
   * Open a file with a null status, and the length
   * passed in as an opt() option (along with sequential IO).
   * The file is opened, the data read, and it must match
   * the source data.
   * opt() is used so that integration testing with external
   * filesystem connectors will downgrade if the option is not
   * recognized.
   */
  @Test
  public void testOpenFileNullStatusButFileLength() throws Throwable {
    describe("use openFile() with a null status and expect the status to be"
        + " ignored. block size, fadvise and length are passed in as"
        + " opt() options");
    Path path = path("testOpenFileNullStatus");
    FileSystem fs = getFileSystem();
    int len = 4;
    byte[] result = new byte[len];
    byte[] dataset = dataset(len, 0x40, 0x80);
    createFile(fs, path, true,
        dataset);
    CompletableFuture<FSDataInputStream> future = fs.openFile(path)
        .withFileStatus(null)
        .opt(FS_OPTION_OPENFILE_READ_POLICY,
            "unknown, sequential, random")
        .optLong(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
        .optLong(FS_OPTION_OPENFILE_LENGTH, len)
        .build();

    try (FSDataInputStream in = future.get()) {
      in.readFully(result);
    }
    compareByteArrays(dataset, result, len);
  }

  /**
   * open a file with a length set as a double; verifies resilience
   * of the parser.
   */
  @Test
  public void testFloatingPointLength() throws Throwable {
    describe("Open file with a length");
    Path path = methodPath();
    FileSystem fs = getFileSystem();
    int len = 4096;
    createFile(fs, path, true,
        dataset(len, 0x40, 0x80));
    final Long l = fs.openFile(path)
        .mustDouble(FS_OPTION_OPENFILE_LENGTH, len)
        .build()
        .thenApply(ContractTestUtils::readStream)
        .get();
    Assertions.assertThat(l)
        .describedAs("bytes read from file %s", path)
        .isEqualTo(len);
  }

}
